package com.free.bsf.redis.election;

import com.free.bsf.core.base.Callable;
import com.free.bsf.core.system.ProcessExitEvent;
import com.free.bsf.core.util.ContextUtils;
import com.free.bsf.core.util.LogUtils;
import com.free.bsf.core.util.ThreadUtils;
import com.free.bsf.redis.RedisException;
import com.free.bsf.redis.config.RedisProperties;
import com.free.bsf.redis.impl.ClusterRedisProvider;
import lombok.Getter;
import lombok.var;

import java.util.ArrayList;
import java.util.List;

/**
 * 2021-03-25
 * by chejiangyi
 */
public class RedisLeaderElection {
    private Object waitLock = new Object();
    private volatile boolean isWait=false;
    @Getter
    private volatile boolean isMaster = false;
    private volatile boolean isRunning = false;
    @Getter
    private List<Callable.Action2<String,Boolean>> listeners= new ArrayList<>();

    public RedisLeaderElection(){
        ProcessExitEvent.register(()->{
            exitElection();
        },1,false);
    }

    public void doElection(String name) {
        var redis = ContextUtils.getBean(ClusterRedisProvider.class,false);
        if(redis==null){
            throw new RedisException("redis配置未开启");
        }
        isRunning=true;
        ThreadUtils.system().submit("redis选举"+name,()->{
            while (isRunning&&!ThreadUtils.system().isShutdown()){
                boolean isLock = redis.tryLock(name,10,()->{
                    notifyListeners(name,true);
                    synchronized (waitLock) {
                        isWait=true;
                        while (isWait==true&&!ThreadUtils.system().isShutdown()) {
                            try {
                                waitLock.wait();
                            } catch (Exception e) {
                                LogUtils.error(this.getClass(), RedisProperties.Project, "【严重】redis 选举锁异常", e);
                            }
                        }
                    }
                    notifyListeners(name,false);
                });
                if(isRunning&&isLock==false) {
                    ThreadUtils.sleep(5000);
                }
            }
        });
    }

    public void exitElection(){
        isRunning=false;
        synchronized (waitLock) {
            isWait = false;
            waitLock.notifyAll();
        }
    }

    private void notifyListeners(String name,boolean isMaster){
        this.isMaster=isMaster;
        if(listeners.size()>0) {
            //异步通知
            ThreadUtils.system().submit("redis选举通知监听器任务", () -> {
                for (var l : listeners) {
                    try {
                        l.invoke(name, isMaster);
                    } catch (Exception e) {
                        LogUtils.error(this.getClass(), RedisProperties.Project, "redis选举通知监听器出错", e);
                    }
                }
            });
        }
    }
}

